import pandas as pd

import xlwings as xw

import numpy as np

import os

from typing import Dict, List, Tuple, Any, Optional

class EnhancedCostAnalysisTemplate:

"""

增强版成本分析模板处理器

解决表间关联和数据完整性问题

"""

def __init__(self):

# 主键映射关系

self.key_mappings = {}

def process_single_file(self, file_path: str, sheet_name: str = None) -> Dict[str, Any]:

"""

处理单个成本表文件,建立完整的关联关系

"""

app = xw.App(visible=False)

wb = app.books.open(file_path)

if sheet_name is None:

sheet_name = wb.sheets[0].name

ws = wb.sheets[sheet_name]

result = {

'file_name': os.path.basename(file_path),

'sheet_name': sheet_name,

'product_info': {},

'cost_details': None,

'summary': {},

'comments': [],

'process_details': None, # 改为复数,包含多个工艺信息

'material_parameters': None, # 材料参数表

'key_relationships': {} # 存储主键关联关系

}

try:

# 1. 提取产品基本信息,并生成主键

result['product_info'] = self.extract_product_info_with_key(ws)

product_key = result['product_info'].get('product_key')

# 2. 提取成本明细,关联产品主键

cost_details = self.extract_cost_details_with_relationships(ws, product_key)

result['cost_details'] = cost_details

# 3. 提取汇总信息,关联产品主键

result['summary'] = self.extract_summary_with_key(ws, product_key)

# 4. 提取批注信息

result['comments'] = self.extract_comments(ws)

# 5. 提取完整的工艺信息表(多个半成品)

process_details = self.extract_complete_process_info(ws, product_key)

result['process_details'] = process_details

# 6. 提取材料参数信息

result['material_parameters'] = self.extract_material_parameters(ws, product_key)

# 7. 建立完整的主键关联映射

result['key_relationships'] = self.build_key_relationships(

product_key, cost_details, process_details

)

except Exception as e:

print(f"处理文件时出错: {e}")

raise

finally:

wb.close()

app.quit()

return result

def extract_product_info_with_key(self, ws) -> Dict[str, Any]:

"""提取产品基本信息并生成主键"""

product_info = {}

# 查找产品名称

product_name_row = self.find_row_by_text(ws, "产品名称")

if product_name_row:

product_name = ws.range(f"C{product_name_row}").value

product_info['产品名称'] = product_name

# 查找产品编码

product_code_row = self.find_row_by_text(ws, "产品编码")

if product_code_row:

product_code = ws.range(f"L{product_code_row}").value

product_info['产品编码'] = product_code

# 生成产品主键

if product_info.get('产品编码'):

product_info['product_key'] = f"PROD_{product_info['产品编码']}"

else:

product_info['product_key'] = f"PROD_{hash(str(product_info))}"

# 提取其他基本信息

date_row = self.find_row_by_text(ws, "日期")

if date_row:

product_info['日期'] = ws.range(f"P{date_row}").value

# 提取制单、审核等信息

maker_row = self.find_row_by_text(ws, "制单")

if maker_row:

product_info['制单'] = ws.range(f"P{maker_row}").value

product_info['审核'] = ws.range(f"Q{maker_row}").value

product_info['批准'] = ws.range(f"R{maker_row}").value

product_info['流程号'] = ws.range(f"S{maker_row}").value

return product_info

def extract_cost_details_with_relationships(self, ws, product_key: str) -> pd.DataFrame:

"""提取成本明细并建立关联关系"""

header_row = self.find_header_row(ws, "序号")

if header_row is None:

raise ValueError("未找到成本明细表头")

headers = self.get_headers(ws, header_row)

end_row = self.find_data_end_row(ws, header_row + 1)

if end_row <= header_row:

raise ValueError("未找到成本明细数据")

# 读取数据区域

data_range = ws.range(f"A{header_row + 1}:T{end_row}")

data_values = data_range.value

# 创建DataFrame

df = pd.DataFrame(data_values, columns=headers)

df = self.clean_cost_data(df)

# 添加主键关联

df['product_key'] = product_key

df['cost_item_id'] = [f"COST_{product_key}_{i + 1}" for i in range(len(df))]

# 提取类型和维度值信息(用于关联工艺信息)

df = self.extract_type_and_dimension(ws, header_row + 1, end_row, df)

return df

def extract_type_and_dimension(self, ws, start_row: int, end_row: int, df: pd.DataFrame) -> pd.DataFrame:

"""提取类型和维度值信息"""

types = []

dimensions = []

for row in range(start_row, end_row + 1):

try:

# 提取类型(U列)

cell_type = ws.range(f"U{row}").value

types.append(cell_type if cell_type is not None else "")

# 提取维度值(V列)

cell_dim = ws.range(f"V{row}").value

dimensions.append(cell_dim if cell_dim is not None else "")

except:

types.append("")

dimensions.append("")

# 确保长度匹配

if len(types) == len(df):

df['类型'] = types

df['维度值'] = dimensions

return df

def extract_summary_with_key(self, ws, product_key: str) -> Dict[str, Any]:

"""提取汇总信息并关联主键"""

summary = {'product_key': product_key}

# 查找合计行

total_row = self.find_row_by_text(ws, "合计", start_row=10, end_row=30)

if total_row:

# 提取主要成本数据

summary['材料总金额'] = ws.range(f"M{total_row}").value

summary['直接人工合计'] = ws.range(f"O{total_row}").value

summary['制造费用合计'] = ws.range(f"P{total_row}").value

summary['制造成本合计'] = ws.range(f"Q{total_row}").value

summary['GRF费用'] = ws.range(f"R{total_row}").value

summary['S费用'] = ws.range(f"S{total_row}").value

summary['总成本'] = ws.range(f"T{total_row}").value

# 查找价格信息

for row in range(total_row - 5, total_row + 5):

try:

cell_value = ws.range(f"U{row}").value

if cell_value:

if "基准价格" in str(cell_value):

summary['基准价格'] = ws.range(f"V{row}").value

elif "销售价格" in str(cell_value):

summary['销售价格'] = ws.range(f"V{row}").value

summary['毛利率'] = ws.range(f"W{row}").value

except:

continue

return summary

def extract_complete_process_info(self, ws, product_key: str) -> pd.DataFrame:

"""提取完整的工艺信息表(多个半成品)"""

# 查找工艺信息表头

process_header_row = self.find_row_by_text(ws, "产品代码", start_row=25, end_row=40)

if not process_header_row:

return None

# 获取工艺信息表头

process_headers = [

"产品代码", "模穴数", "单重(G)", "毛重(G)", "材料利用率",

"水口比重", "原料描述", "材料单价", "材料成本", "生产机台",

"成型周期/冲速", "小时产量", "小时费用(元)", "制造费用",

"制造成本", "装配产能(PCS/H)", "装配设备", "实际成本", "水口比例"

]

# 查找数据结束行

process_end_row = self.find_process_data_end_row(ws, process_header_row + 1)

if process_end_row <= process_header_row:

return None

# 读取工艺数据

data_values = []

for row in range(process_header_row + 1, process_end_row + 1):

row_data = []

for col in range(1, 20): # A到S列

try:

cell_value = ws.range(row, col).value

row_data.append(cell_value)

except:

row_data.append(None)

data_values.append(row_data)

# 创建DataFrame

df = pd.DataFrame(data_values, columns=process_headers)

df = df.dropna(how='all') # 删除空行

# 添加主键关联

df['product_key'] = product_key

df['process_item_id'] = [f"PROC_{product_key}_{i + 1}" for i in range(len(df))]

return df

def extract_material_parameters(self, ws, product_key: str) -> pd.DataFrame:

"""提取材料参数信息"""

# 查找可能包含材料参数的区域

material_data = []

# 在成本明细中提取材料相关信息

header_row = self.find_header_row(ws, "序号")

if header_row:

end_row = self.find_data_end_row(ws, header_row + 1)

for row in range(header_row + 1, end_row + 1):

try:

product_name = ws.range(f"B{row}").value

material_desc = ws.range(f"F{row}").value

usage = ws.range(f"K{row}").value

unit_price = ws.range(f"L{row}").value

if product_name and material_desc:

material_data.append({

'产品名称': product_name,

'材料描述': material_desc,

'用量': usage,

'材料单价': unit_price,

'product_key': product_key

})

except:

continue

return pd.DataFrame(material_data)

def find_process_data_end_row(self, ws, start_row: int) -> int:

"""查找工艺数据结束行"""

row = start_row

while row < start_row + 20: # 限制搜索范围

try:

# 检查产品代码列是否有值

cell_value = ws.range(f"A{row}").value

if cell_value is None or cell_value == "":

return row - 1

row += 1

except:

return row - 1

return row - 1

def build_key_relationships(self, product_key: str, cost_details: pd.DataFrame,

process_details: pd.DataFrame) -> Dict[str, Any]:

"""建立完整的主键关联关系"""

relationships = {

'product_key': product_key,

'cost_to_process': {},

'process_to_cost': {}

}

if cost_details is not None and process_details is not None:

# 通过产品代码建立成本明细与工艺信息的关联

for _, cost_row in cost_details.iterrows():

if cost_row.get('类型') == '半成品' and cost_row.get('产品编码'):

product_code = cost_row['产品编码']

# 在工艺信息中查找匹配的产品代码

matching_process = process_details[process_details['产品代码'] == product_code]

if not matching_process.empty:

cost_id = cost_row.get('cost_item_id')

process_id = matching_process.iloc[0].get('process_item_id')

relationships['cost_to_process'][cost_id] = process_id

relationships['process_to_cost'][process_id] = cost_id

return relationships

# 保留原有的辅助方法

def find_header_row(self, ws, header_text: str) -> Optional[int]:

for row in range(1, 50):

try:

cell_value = ws.range(f"A{row}").value

if cell_value and header_text in str(cell_value):

return row

except:

continue

return None

def find_row_by_text(self, ws, text: str, start_row: int = 1, end_row: int = 50) -> Optional[int]:

for row in range(start_row, end_row + 1):

try:

cell_value = ws.range(f"A{row}").value

if cell_value and text in str(cell_value):

return row

except:

continue

return None

def get_headers(self, ws, header_row: int) -> List[str]:

headers = []

for col in range(1, 21): # A到T列

try:

cell_value = ws.range(header_row, col).value

headers.append(cell_value if cell_value is not None else f"Col_{col}")

except:

headers.append(f"Col_{col}")

return headers

def find_data_end_row(self, ws, start_row: int) -> int:

row = start_row

while row < start_row + 50: # 限制搜索范围

try:

cell_value = ws.range(f"A{row}").value

if cell_value is None or (isinstance(cell_value, str) and "合计" in cell_value):

return row - 1

if isinstance(cell_value, (int, float)) and cell_value > 0:

row += 1

else:

return row - 1

except:

return row - 1

return row - 1

def clean_cost_data(self, df: pd.DataFrame) -> pd.DataFrame:

df = df.dropna(how='all')

df = df.dropna(axis=1, how='all')

df = df.reset_index(drop=True)

numeric_columns = ['用量', '材料单价', '材料金额', '材料比重', '直接人工', '制造费用']

for col in numeric_columns:

if col in df.columns:

df[col] = pd.to_numeric(df[col], errors='coerce')

return df

def extract_comments(self, ws) -> List[Dict[str, Any]]:

comments = []

try:

all_cells = ws.used_range

for row in all_cells.rows:

for cell in row:

try:

if hasattr(cell, 'comment') and cell.comment:

comment_text = cell.comment.text if cell.comment.text else ''

comments.append({

'cell_address': cell.address,

'cell_value': cell.value,

'comment_text': comment_text.strip(),

'row': cell.row,

'column': cell.column

})

except:

continue

except Exception as e:

print(f"提取批注时出错: {e}")

return comments

def export_enhanced_to_excel(result, output_path):

"""将增强版处理结果导出到Excel文件"""

app = xw.App(visible=False)

wb = app.books.add()

try:

# 1. 产品信息表(带主键)

product_sheet = wb.sheets.add("产品信息")

product_info = result['product_info']

product_sheet.range('A1').value = "产品信息(主表)"

row = 2

for key, value in product_info.items():

product_sheet.range(f'A{row}').value = key

product_sheet.range(f'B{row}').value = value

row += 1

# 2. 成本明细表(关联产品主键)

if result['cost_details'] is not None:

cost_sheet = wb.sheets.add("成本明细")

cost_sheet.range('A1').value = "成本明细(关联表)"

cost_sheet.range('A2').value = result['cost_details']

# 3. 汇总信息表(关联产品主键)

summary_sheet = wb.sheets.add("汇总信息")

summary_sheet.range('A1').value = "汇总信息(关联表)"

summary_info = result['summary']

row = 2

for key, value in summary_info.items():

summary_sheet.range(f'A{row}').value = key

summary_sheet.range(f'B{row}').value = value

row += 1

# 4. 工艺信息表(多个半成品,关联产品主键)

if result['process_details'] is not None and not result['process_details'].empty:

process_sheet = wb.sheets.add("工艺信息")

process_sheet.range('A1').value = "工艺信息(关联表)"

process_sheet.range('A2').value = result['process_details']

# 5. 材料参数表

if result['material_parameters'] is not None and not result['material_parameters'].empty:

material_sheet = wb.sheets.add("材料参数")

material_sheet.range('A1').value = "材料参数(关联表)"

material_sheet.range('A2').value = result['material_parameters']

# 6. 关联关系表

relation_sheet = wb.sheets.add("关联关系")

relation_sheet.range('A1').value = "主键关联关系"

relationships = result['key_relationships']

row = 2

relation_sheet.range(f'A{row}').value = "产品主键"

relation_sheet.range(f'B{row}').value = relationships.get('product_key')

row += 2

relation_sheet.range(f'A{row}').value = "成本→工艺关联"

row += 1

for cost_id, process_id in relationships.get('cost_to_process', {}).items():

relation_sheet.range(f'A{row}').value = cost_id

relation_sheet.range(f'B{row}').value = process_id

row += 1

# 7. 批注信息表

if result['comments']:

comments_sheet = wb.sheets.add("批注信息")

comments_df = pd.DataFrame(result['comments'])

comments_sheet.range('A1').value = comments_df

wb.save(output_path)

print(f"完整数据已成功导出到: {output_path}")

except Exception as e:

print(f"导出数据时出错: {e}")

finally:

wb.close()

app.quit()

def save_to_database_enhanced(result, connection_string: str, table_prefix: str = ""):

"""将增强版数据保存到数据库"""

# 保存产品信息(主表)

product_info = result['product_info']

# 这里可以添加数据库保存逻辑

# 保存成本明细(关联表)

if result['cost_details'] is not None:

df_cost = result['cost_details']

# df_cost.to_sql(f"{table_prefix}cost_details", con=connection_string, if_exists='append', index=False)

print(f"成本明细数据已保存,记录数: {len(df_cost)}")

# 保存工艺信息(关联表)

if result['process_details'] is not None and not result['process_details'].empty:

df_process = result['process_details']

# df_process.to_sql(f"{table_prefix}process_details", con=connection_string, if_exists='append', index=False)

print(f"工艺信息数据已保存,记录数: {len(df_process)}")

# 保存材料参数

if result['material_parameters'] is not None and not result['material_parameters'].empty:

df_material = result['material_parameters']

# df_material.to_sql(f"{table_prefix}material_params", con=connection_string, if_exists='append', index=False)

print(f"材料参数数据已保存,记录数: {len(df_material)}")

# 保存关联关系

relationships = result['key_relationships']

# 这里可以保存关联关系到数据库

print("所有数据表已建立完整关联关系,可以用于数据库查询和关联分析")

# 使用示例

if __name__ == "__main__":

path = r"E:\韦瑞奎\pdf\整理成本表到数据库"

# 创建增强版处理器

processor = EnhancedCostAnalysisTemplate()

# 处理单个文件

file_path = os.path.join(path, "1884-288-38-8.xlsx")

result = processor.process_single_file(file_path)

# 打印处理结果

print(f"文件: {result['file_name']}")

print(f"产品主键: {result['product_info'].get('product_key')}")

print(f"成本明细记录数: {len(result['cost_details']) if result['cost_details'] is not None else 0}")

print(f"工艺信息记录数: {len(result['process_details']) if result['process_details'] is not None else 0}")

print(f"材料参数记录数: {len(result['material_parameters']) if result['material_parameters'] is not None else 0}")

print(f"关联关系数量: {len(result['key_relationships'].get('cost_to_process', {}))}")

# 导出完整数据

output_file = os.path.join(path, "完整导出数据.xlsx")

export_enhanced_to_excel(result, output_file)

# 模拟数据库保存

# save_to_database_enhanced(result, "your_connection_string")